#include <time.h>

#include "volume_proto.h"
#include "list.h"

#include "lsv.h"
#include "lsv_log.h"
#include "lsv_bitmap.h"
#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"
#include "lsv_rcache.h"
#include "lsv_volume.h"
#include "lsv_help.h"
#include "fnotify.h"

#define LSV_WAL2_PATH                   "/dev/shm/lich4/lsv/feature/wal2"
#define LSV_SPIRAL_WRITE_PATH           "/dev/shm/lich4/lsv/feature/spiral_write"
#define LSV_WBUF_PIPELINE_PATH          "/dev/shm/lich4/lsv/feature/wbuf_pipeline"
#define LSV_WBUF_READ_PATH              "/dev/shm/lich4/lsv/feature/wbuf_read"

#define LSV_LOG_PATH                    "/dev/shm/lich4/lsv/feature/log"

#define LSV_BITMAP_PATH                 "/dev/shm/lich4/lsv/feature/bitmap"
#define LSV_BITMAP_GROUP_COMMIT_PATH    "/dev/shm/lich4/lsv/feature/bitmap_group_commit"

#define LSV_GC_PATH                     "/dev/shm/lich4/lsv/feature/gc"
#define LSV_BITMAP_GC_VALUE_UP_PATH     "/dev/shm/lich4/lsv/feature/bitmap_gc_value_up"

#define LSV_FEATURE_SWITCH TRUE

uint32_t lsv_feature = 0;

int volume_proto_read_header(lsv_volume_proto_t *lsv_info, const fileid_t *parent, const fileid_t *fileid, const char *snap, void *header);

static void __init_lsv_info(lsv_volume_proto_t *lsv_info) {
        lsv_info->write_count = 0;

        lsv_info->wbuf_write_count = 0;
        lsv_info->wbuf_write_end_count = 0;

        lsv_info->log_write_count = 0;
        lsv_info->log_commit_count = 0;

        lsv_info->bitmap_write_count = 0;
        lsv_info->bitmap_batch_write_count = 0;

        lsv_info->input_bytes = 0;
        lsv_info->log_bytes = 0;
        lsv_info->commit_bytes = 0;

        lsv_info->read_wbuf_hit = 0;
        lsv_info->read_wbuf_miss = 0;
        lsv_info->read_page_miss = 0;
        lsv_info->read_rc_miss = 0;

        ltable_init(&lsv_info->lock_table, "lsv");
        lba_kv_init(&lsv_info->io_hash);
        lba_fifo_init(&lsv_info->lba_fifo);

        lsv_info->row3_tail.chunk_id = 0;
        lsv_info->row3_tail.chunk_off = 0;

        count_list_init(&lsv_info->io_stream);
        lsv_rwlock_init(&lsv_info->io_lock);
        lsv_rwlock_init(&lsv_info->info_lock);

        time_count_init(&lsv_info->tc, "time_count", 1000000);

        memset(&lsv_info->share, 0, sizeof(lsv_info->share));
        memset(&lsv_info->op_stat, 0, sizeof(op_stat_t) * OP_MAX);

        chunk_pool_init((chunk_pool **)&lsv_info->chunk_pool, lsv_info, 4096);
}

inline void lsv_info_stat(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        uint64_t total = lsv_info->wbuf_write_count +
                         lsv_info->wbuf_write_end_count +
                         lsv_info->log_commit_count +
                         lsv_info->bitmap_write_count +
                         lsv_info->bitmap_batch_write_count;
        DERROR("vol %ju lsn %ju io %ju total %ju [wb %ju log %ju bm %ju] wb [%u %u/%u %u] pl %u %u"
               " read miss [wb %ju rp %ju %ju %ju] used [%ju %ju %ju %ju %ju] con [%ju %ju]\n",
               lsv_info->ino,
               wbuf->last_lsn,
               lsv_info->write_count,
               total,
               lsv_info->wbuf_write_count,
               lsv_info->log_write_count,
               lsv_info->bitmap_write_count,

               wbuf->free_list.count,
               wbuf->wait_list.count,
               wbuf->wal->free_list.count,
               wbuf->wal->wait_list.count,
               wbuf->pipeline.log_queue.count,
               wbuf->pipeline.bitmap_queue.count,

               lsv_info->read_wbuf_hit,
               lsv_info->read_wbuf_miss,
               lsv_info->read_page_miss,
               lsv_info->read_rc_miss,

               used_per_op(&lsv_info->op_stat[MALLOC_CHUNK]),
               used_per_op(&lsv_info->op_stat[WRITE_4K]),
               used_per_op(&lsv_info->op_stat[WRITE_8K]),
               used_per_op(&lsv_info->op_stat[WRITE_1M]),
               used_per_op(&lsv_info->op_stat[WRITE_BITMAP]),

               lsv_info->share.malloc_chunk_enter_count,
               lsv_info->share.write_log_enter_count
        );
}

int lsv_info_check_ready(lsv_volume_proto_t *lsv_info) {
        int ret;

        #if LSV_LOW_VOL_TEST
        return 0;
        #endif 
        if (!lsv_bitmap_is_ready(lsv_info)) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

void lsv_info_print_io_entry(lsv_volume_proto_t *lsv_info, lsv_io_entry_t *io_entry) {
        int left, step;
        uint64_t off;

        (void) lsv_info;
        int count = 0;
        off = io_entry->io->offset;
        left = io_entry->io->size;
        while (left > 0) {
                step = LSV_PAGE_SIZE - off % LSV_PAGE_SIZE;
                step = _min(step, left);

                DINFO_NOP("seq %u io %p lsn %ju off <%ju, %u> %u lba %ju page off <%ju, %u>\n",
                      count,
                      io_entry->io,
                      io_entry->io->lsn,
                      io_entry->io->offset,
                      io_entry->io->size,
                      io_entry->buffer->len,
                      OFF2LBA(off),
                      off,
                      step);

                off += step;
                left -= step;
                count++;
        }
}

void lsv_info_print_io_stream(lsv_volume_proto_t *lsv_info) {
        struct list_head *pos, *n;
        lsv_io_entry_t *io_entry;

        DINFO("total %u\n", lsv_info->io_stream.count);
        list_for_each_safe(pos, n, &lsv_info->io_stream.list) {
                io_entry = list_entry(pos, lsv_io_entry_t, hook);
                // YASSERT(io_entry->io->size == io_entry->buffer->len);

                lsv_info_print_io_entry(lsv_info, io_entry);
        }
}

int lsv_info_set_poweroff(lsv_volume_proto_t *lsv_info, uint32_t power_off) {

        // 卷控制器重新加载　新的已经加载完　旧的设置poweroff　需要写数据　这里会出现错误
        return 0;

        volume_proto_t *volume_proto = lsv_info->volume_proto;
        table1_t *table1 = &volume_proto->table1;

        table1->fileinfo.system_power_off = power_off;
        table1->table_proto->setinfo(table1->table_proto, &table1->fileinfo,
                                     sizeof(fileinfo_t), TABLE_PROTO_INFO_ATTR);

        DINFO("set power=%u\n", power_off);
        return 0;
}

int lsv_info_format(lsv_volume_proto_t *lsv_info) {

        YASSERT(lsv_info->u.start_mode == LSV_SYS_CREATE);

        uint64_t parent_id = lsv_info->volume_proto->table1.fileinfo.source;
        const char *snap = lsv_info->volume_proto->table1.fileinfo.snap;

        __init_lsv_info(lsv_info);

        int ret = lsv_volume_init(lsv_info, LSV_SYS_CREATE);
        if (ret < 0) {
                DINFO("lsv_init:create volume error:%d\n", ret);
                return ret;
        }

        // chunk_id: 65
        ret = lsv_log_init(lsv_info, LSV_SYS_CREATE);
        if (ret < 0) {
                DINFO("lsv_init:init log error:%d\n", ret);
                return ret;
        }

        // chunk_id: 66
        if (!parent_id)
                ret = lsv_bitmap_init(lsv_info, LSV_SYS_CREATE, 0);
        else {
                fileid_t parent, fileid;
                mk_volid(&parent, parent_id);
                mk_volid(&fileid, parent_id);
                struct lsv_bitmap_header *header = malloc(sizeof(struct lsv_bitmap_header) + BITMAP_CHUNK_HEADER_SIZE(lsv_info->size));

                ret = volume_proto_read_header(lsv_info, &parent, &fileid, snap, header);
                if (ret < 0) {
                        DINFO("lsv_init:volume_proto_read_header error:%d\n", ret);
                        free(header);
                        return ret;
                }
                ret = lsv_bitmap_init_by_buf(lsv_info, parent_id, header, 0);
                if (ret < 0) {
                        DINFO("lsv_init:lsv_bitmap_init_by_buf error:%d\n", ret);
                        free(header);
                        return ret;
                }
                free(header);
        }

        if (ret < 0) {
                DINFO("lsv_init:init bitmap error:%d\n", ret);
                return ret;
        }

        // chunk_id: 67-4162
        ret = lsv_rcache_init(lsv_info);
        if (ret < 0) {
                DINFO("lsv_init:init rcache error:%d\n", ret);
                return ret;
        }

        // wal
        ret = lsv_wbuffer_init(lsv_info, LSV_SYS_CREATE);
        if (ret < 0) {
                DINFO("lsv_init:init wbuffer error:%d\n", ret);
                return ret;
        }

        return 0;
}

int lsv_info_init(lsv_volume_proto_t *lsv_info) {

        YASSERT(lsv_info->u.start_mode == LSV_SYS_LOAD || lsv_info->u.start_mode == LSV_SYS_RECOVERY);

        DINFO("start mode %d\n", lsv_info->u.start_mode);

        LSV_TEST_TIME_BEGIN(lsv_init);

        __init_lsv_info(lsv_info);

        LSV_TEST_TIME_BEGIN(lsv_init_volume);
        DINFO("lsv_volume_init: %p\n", lsv_info);
        int ret = lsv_volume_init(lsv_info, lsv_info->u.start_mode);
        if (ret < 0) {
                DINFO("lsv_init:create volume error:%d\n", ret);
                return ret;
        }
        LSV_TEST_TIME_END(lsv_init_volume);

        LSV_TEST_TIME_BEGIN(lsv_init_log);
        DINFO("lsv_log_init: %p\n", lsv_info);
        ret = lsv_log_init(lsv_info, lsv_info->u.start_mode);
        if (ret < 0) {
                DINFO("lsv_init:init log error:%d\n", ret);
                return ret;
        }
        LSV_TEST_TIME_END(lsv_init_log);

        LSV_TEST_TIME_BEGIN(lsv_init_bitmap);
        DINFO("lsv_bitmap_init: %p\n", lsv_info);
        ret = lsv_bitmap_init(lsv_info, lsv_info->u.start_mode, 0);
        if (ret < 0) {
                DINFO("lsv_init:init bitmap error:%d\n", ret);
                return ret;
        }
        LSV_TEST_TIME_END(lsv_init_bitmap);

        LSV_TEST_TIME_BEGIN(lsv_init_rcache);

        // wbuffer REDO会引发gc过程，gc依赖于rcache，所以需要先行初始化.
        // @sa lsv_rcache_clean
        DINFO("lsv_rcache_init: %p\n", lsv_info);
        ret = lsv_rcache_init(lsv_info);
        if (ret < 0) {
                DINFO("lsv_init:init rcache error:%d\n", ret);
                return ret;
        }
        LSV_TEST_TIME_END(lsv_init_rcache);

        LSV_TEST_TIME_BEGIN(lsv_init_wbuffer);
        DINFO("lsv_wbuffer_init: %p\n", lsv_info);
        ret = lsv_wbuffer_init(lsv_info, lsv_info->u.start_mode);
        if (ret < 0) {
                DINFO("lsv_init:init wbuffer error:%d\n", ret);
                return ret;
        }
        LSV_TEST_TIME_END(lsv_init_wbuffer);

#if 0
        ret = lsv_volumep_dump(lsv_info);
        if (ret < 0) {
                DINFO("lsv_init:update volume error:%d\n", ret);
                return ret;
        }
#endif

        LSV_TEST_TIME_END(lsv_init);
        return 0;
}

int lsv_info_destroy(lsv_volume_proto_t *lsv_info) {
        int ret = 0;

        // first flush, then destroy

        ret = lsv_wbuffer_flush(lsv_info);
        if (ret < 0) {
                GOTO(err_ret, ret);
        }

        ret = lsv_log_flush(lsv_info);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = lsv_volume_flush(lsv_info);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // 设置正常关机标志
        lsv_info_set_poweroff(lsv_info, 1);

        // flush完成

        DINFO("lsv_wbuffer_destroy\n");
        ret = lsv_wbuffer_destroy(lsv_info);
        if (ret < 0) {
                GOTO(err_ret, ret);
        }

        DINFO("lsv_rcache_release\n");
        ret = lsv_rcache_release(lsv_info);
        if (ret < 0) {
                GOTO(err_ret, ret);
        }

        DINFO("lsv_log_destroy\n");
        ret = lsv_log_destroy(lsv_info);
        if (ret < 0) {
                GOTO(err_ret, ret);
        }

        DINFO("lsv_bitmap_deinit\n");
        ret = lsv_bitmap_deinit(lsv_info);
        if (ret < 0) {
                GOTO(err_ret, ret);
        }

        DINFO("lsv_volume_delete\n");
        ret = lsv_volume_delete(lsv_info);
        if (ret < 0) {
                GOTO(err_ret, ret);
        }

        // not malloc'ed
        // free(lsv_info);

        lsv_rwlock_destroy(&lsv_info->io_lock);

        DINFO("finish\n");

        return 0;
err_ret:
        return ret;
}

int lsv_info_setattr(lsv_volume_proto_t *lsv_info) {
        int ret = 0;

        ret = lsv_log_setattr(lsv_info);
        if (ret < 0) {
                DINFO("setattr log error:%d\n", ret);
                goto ERR;
        } 

        ERR: return ret;
}

static int __lsv_feature_switch(const char *buf, uint32_t extra)
{
        int ret, on;

        on = atoi(buf);

        if (on == 0) {
                lsv_feature &= ~extra;
        } else if (on == 1) {
                lsv_feature |= extra;
        } else {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        DINFO("fnotify file modified: mask (0x%08x) %s\n", extra, on ? "on" : "off");

        return 0;
err_ret:
        return ret;
}

int lsv_conf_init()
{
#if 0
        DINFO("disable lsv feature switch\n");
        lsv_feature = LSV_FEATURE_WAL2 |
                      LSV_FEATURE_WBUF_PIPELINE |
                      LSV_FEATURE_WBUF_READ |
                      LSV_FEATURE_BITMAP |
                      LSV_FEATURE_BITMAP_GC_VALUE_UP |
                      LSV_FEATURE_BITMAP_GROUP_COMMIT |
                      LSV_FEATURE_LOG |
                      LSV_FEATURE_GC;
        return 0;
#endif

        int ret;

        DINFO("enable lsv feature switch\n");

        ret = fnotify_create(LSV_WAL2_PATH, "1", __lsv_feature_switch, LSV_FEATURE_WAL2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        /*
        ret = fnotify_create(LSV_SPIRAL_WRITE_PATH, "1", __lsv_feature_switch, LSV_FEATURE_SPIRAL_WRITE);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        */

        ret = fnotify_create(LSV_WBUF_PIPELINE_PATH, "1", __lsv_feature_switch, LSV_FEATURE_WBUF_PIPELINE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = fnotify_create(LSV_WBUF_READ_PATH, "1", __lsv_feature_switch, LSV_FEATURE_WBUF_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = fnotify_create(LSV_LOG_PATH, "1", __lsv_feature_switch, LSV_FEATURE_LOG);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = fnotify_create(LSV_BITMAP_PATH, "1", __lsv_feature_switch, LSV_FEATURE_BITMAP);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = fnotify_create(LSV_BITMAP_GROUP_COMMIT_PATH, "1", __lsv_feature_switch, LSV_FEATURE_BITMAP_GROUP_COMMIT);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = fnotify_create(LSV_GC_PATH, "1", __lsv_feature_switch, LSV_FEATURE_GC);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = fnotify_create(LSV_BITMAP_GC_VALUE_UP_PATH, "1", __lsv_feature_switch, LSV_FEATURE_BITMAP_GC_VALUE_UP);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
